[Rust] Axumを使ってMomentoにアクセスする
Introduction
この記事では、AWS Lambdaから
キャッシュサービスであるMomentoにアクセスしてみました。
Momentoはサーバレスアーキテクチャとの相性がとてもよいですが、
普通のWebアプリにも簡単に組み込むことができます。
本稿ではRustのWebフレームワークであるAxumを使ってMomentoにアクセスしてみます。
Momento?
ここでも解説しているとおり、
Momentoは、クラウドネイティブなサーバーレスキャッシュサービスです。
設定や管理はすべてMomentoが行い、
最適化やスケールを考慮する必要はありません。
常に最適な構成を提供してくれて、
料金も転送量のみ(送受信1GBあたり0.15USD)です。
プログラムに組み込むことも簡単で、各種SDKが提供されています。
今回はMomentoのCrateをAxumで使います。
Axum?
Axumはactix-webと並んで人気の
Webアプリケーションフレームワークです。
非同期ランタイムのTokioと同じチームが開発しています。
Axumについてはドキュメントなどをご確認ください。
Environment
Momentoはすでに設定済み(認証トークン取得済み)とします。
トークン取得についてはここなどをご確認ください。
- OS : MacOS 12.4
- Rust : 1.62.1
Create Application
Axumで雛形作成
最初はcargoでプロジェクトを生成します。
% cargo new axum-example && cd axum-example
Cargo.tomlの依存ライブラリを下記のように指定。
[dependencies] axum = "0.5.16" tokio = { version = "1.0", features = ["full"] } momento = "0.7.4" serde = "1.0.145" serde_json = "1.0.86"
次に、src/main.rsでルーティング設定やハンドラーの仮実装をします。
/postにキャッシュ名、キー名、値をJSONフォーマットでPOSTすると
Momentoにキャッシュを登録します。
/get/<キャッシュ名>/キー名
でGETするとMomentoからキャッシュ値の取得をします。
まずはリクエストとレスポンス用の構造体を定義します。
//パスパラメータ用 #[derive(Deserialize)] struct Params { cache_name: String, key: String, } //JSONリクエストボディ用 #[derive(Debug, Deserialize)] struct InputBody { cache_name: String, key: String, value: String, } //レスポンス用 #[derive(Debug, Serialize)] struct ResponseBody { message: String, }
GET用とPOST用のハンドラーも定義します。
//GET用 async fn handler_get( Path(Params { cache_name, key }): Path<Params>, ) -> impl IntoResponse { println!("{:?}", cache_name); println!("{:?}", key); ( StatusCode::OK, Json(json!({"message":"response!"})), ) } //POST用 async fn handler_post( Json(input): Json<InputBody>, ) -> impl IntoResponse { let cache_name = input.cache_name; let key = input.key; let value = input.value; println!("{:?}", cache_name); println!("{:?}", key); println!("{:?}", value); ( StatusCode::CREATED, Json(json!({"message":"cache set ok!"})), ) }
そしてmain関数の定義です。
main関数では、さきほどのハンドラーとURLパスを関連付け、
ルーティング定義をしてサーバを起動します。
#[tokio::main] async fn main() { let app = Router::new() .route("/get/:cache_name/:key", get(handler_get)) .route("/post", post(handler_post)); // run it let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); println!("listening on {}", addr); axum::Server::bind(&addr) .serve(app.into_make_service()) .await .unwrap(); }
cargo run でアプリを起動してみます。
% cargo run -p axum-example Finished dev [unoptimized + debuginfo] target(s) in 0.27s Running `target/debug/axum-example` listening on 127.0.0.1:3000
この時点でもcurlなどでアクセスすれば結果を取得できます。
では次に各ハンドラーでMomentoにアクセスするように実装します。
Momentoの組み込み
Momentoにアクセスするためのクライアント(SimpleCacheClient)は
Arc
所有権を複数のスレッド間で共有できるようにします。
そして、状態を保持するために使用できるExtension機能を使って再利用します。
struct State { client: SimpleCacheClient, } type SharedState = Arc<Mutex<State>>;
なお、現在rcであるAxumのversion 0.6では、
ここにあるように、
extract::Stateをつかってもう少しシンプルに記述できるみたいです。
get/postハンドラーも修正します。
引数にExtension
そこからMomentoクライアントを取り出して使用します。
async fn handler_get( Extension(state): Extension<SharedState>, Path(Params { cache_name, key }): Path<Params>, ) -> impl IntoResponse { let client = &mut state.lock().await.client; println!("{:?}", cache_name); println!("{:?}", key); let result: MomentoGetResponse = client.get(&cache_name, key.clone()).await.unwrap(); let msg = format!( "key:{} , value:{}", key, String::from_utf8(result.value).unwrap() ); let response = ResponseBody { message: msg }; ( StatusCode::OK, Json(response), ) } async fn handler_post( Extension(state): Extension<SharedState>, Json(input): Json<InputBody>, ) -> impl IntoResponse { let client = &mut state.lock().await.client; let cache_name = input.cache_name; let key = input.key; let value = input.value; println!("{:?}", cache_name); println!("{:?}", key); println!("{:?}", value); client .set(&cache_name, key.clone(), value.clone(), None) .await .unwrap(); ( StatusCode::CREATED, Json(json!({"message":"cache set ok!"})), ) }
main関数も修正します。
init_momento_client関数でMomentoクライアントの初期化を行い、
ExtensionでMomentoクライアントを設定します。
async fn init_momento_client() -> Result<SimpleCacheClient, ()> { let env_token = env::var("token").expect("token is undefined."); let ttl = 60; match SimpleCacheClientBuilder::new(env_token, NonZeroU64::new(ttl).unwrap()) { Ok(client) => Ok(client.build()), Err(e) => panic!("Error : {:?}", e), } } #[tokio::main] async fn main() { // build our application with a route let moment_cli = init_momento_client().await.unwrap(); let state = Arc::new(Mutex::new(State { client: moment_cli })); let app = Router::new() .route("/get/:cache_name/:key", get(handler_get)) .route("/post", post(handler_post)) .layer(Extension(state)); ・・・ }
最終的なmain.rsは下記のようになります。
use axum::{ extract::Extension, extract::Path, http::StatusCode, response::IntoResponse, routing::get, routing::post, Json, Router, }; use std::env; use std::net::SocketAddr; use std::num::NonZeroU64; use std::sync::Arc; use tokio::sync::Mutex; use serde::{Deserialize, Serialize}; use serde_json::json; use momento::response::cache_get_response::MomentoGetResponse; use momento::simple_cache_client::{SimpleCacheClient, SimpleCacheClientBuilder}; #[derive(Deserialize)] struct Params { cache_name: String, key: String, } #[derive(Debug, Deserialize)] struct InputBody { cache_name: String, key: String, value: String, } #[derive(Debug, Serialize)] struct ResponseBody { message: String, } struct State { client: SimpleCacheClient, } type SharedState = Arc<Mutex<State>>; async fn init_momento_client() -> Result<SimpleCacheClient, ()> { let env_token = env::var("token").expect("token is undefined."); let ttl = 60; match SimpleCacheClientBuilder::new(env_token, NonZeroU64::new(ttl).unwrap()) { Ok(client) => Ok(client.build()), Err(e) => panic!("Error : {:?}", e), } } async fn handler_get( Extension(state): Extension<SharedState>, Path(Params { cache_name, key }): Path<Params>, ) -> impl IntoResponse { let client = &mut state.lock().await.client; println!("{:?}", cache_name); println!("{:?}", key); let result: MomentoGetResponse = client.get(&cache_name, key.clone()).await.unwrap(); let msg = format!( "key:{} , value:{}", key, String::from_utf8(result.value).unwrap() ); let response = ResponseBody { message: msg }; ( StatusCode::OK, Json(response), ) } async fn handler_post( Extension(state): Extension<SharedState>, Json(input): Json<InputBody>, ) -> impl IntoResponse { let client = &mut state.lock().await.client; let cache_name = input.cache_name; let key = input.key; let value = input.value; println!("{:?}", cache_name); println!("{:?}", key); println!("{:?}", value); client .set(&cache_name, key.clone(), value.clone(), None) .await .unwrap(); ( StatusCode::CREATED, Json(json!({"message":"cache set ok!"})), ) } #[tokio::main] async fn main() { // build our application with a route let moment_cli = init_momento_client().await.unwrap(); let state = Arc::new(Mutex::new(State { client: moment_cli })); let app = Router::new() .route("/get/:cache_name/:key", get(handler_get)) .route("/post", post(handler_post)) .layer(Extension(state)); // run it let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); println!("listening on {}", addr); axum::Server::bind(&addr) .serve(app.into_make_service()) .await .unwrap(); }
動作確認
では動作確認してみましょう。
token環境変数に認証トークンをセットします。
% export token=<Momentoの認証トークン>
アプリを起動します。
% cargo run -p axum-example Finished dev [unoptimized + debuginfo] target(s) in 0.27s Running `target/debug/axum-example` listening on 127.0.0.1:3000
サーバが起動したらcurlを使ってキャッシュデータの登録と取得を実行してみましょう。
Momentoにアクセスできていることがわかります。
# Create Cache Data % curl -X POST -H "Content-Type:application/json" http://127.0.0.1:3000/post -d '{"cache_name":"default-cache","key":"mm_key","value":"mm_value"}' {"message":"cache set ok!"}% # Get Cache Data % curl http://127.0.0.1:3000/get/default-cache/mm_key {"message":"key:mm_key , value:mm_value"}%
Summary
今回はAxumからMomentoにアクセスしてみました。
ExtensionをつかってMomentoクライアントを使い回す以外は
普通にMomentoへアクセスするのと変わりはありません。
AWS Lambdaでも通常のWebアプリでも簡単にMomentoを使うことができます。
Momentoセミナーのお知らせ
2022年11月11日(金) 16:00からMomentoのセミナーを開催します。
興味があるかたはぜひご参加ください。